using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Naruto.Subscribe.Provider.Kafka.Extension;
using Naruto.Subscribe.Provider.Kafka.Interface;
using Naruto.Subscribe.Provider.Kafka.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Naruto.Subscribe.Provider.Kafka.Internal
{
    /// <summary>
    /// 消费者 连接 工厂的默认实现
    /// </summary>
    public class ConsumerConnectionFactory : IConsumerConnectionFactory, IDisposable
    {
        /// <summary>
        /// 日志接口
        /// </summary>
        private readonly ILogger _logger;
        /// <summary>
        /// kafka的消费者
        /// </summary>
        protected IConsumer<string, byte[]> _consumer;

        /// <summary>
        /// 定义一个信号处理 保证多线程中的原子性
        /// 默认只允许一个信号
        /// </summary>
        private static readonly SemaphoreSlim _connectionSemaphoreSlim = new SemaphoreSlim(1, 1);

        /// <summary>
        /// 配置信息
        /// </summary>
        private readonly KafkaOption _kafkaOption;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="logger"></param>
        public ConsumerConnectionFactory(ILogger<ConsumerConnectionFactory> logger, IOptions<KafkaOption> options)
        {
            _logger = logger;
            _kafkaOption = options.Value;
        }

        /// <summary>
        /// 
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
        }

        /// <summary>
        /// 释放
        /// </summary>
        /// <param name="isDispose"></param>
        protected virtual void Dispose(bool isDispose)
        {
            if (isDispose)
            {
                _consumer?.Dispose();
                _consumer = null;
            }
        }

        /// <summary>
        /// 获取
        /// </summary>
        /// <returns></returns>
        public async Task<IConsumer<string, byte[]>> GetAsync()
        {
            await this.ConnectionConsumer();
            return _consumer;
        }
        public IConsumer<string, byte[]> Get()
        {
            //初始化配置信息
            var config = new ConsumerConfig(_kafkaOption.Config ?? new Dictionary<string, string>());
            //服务地址
            config.BootstrapServers ??= _kafkaOption.BrokersServices.ToBrokersService();
            //消费者组
            config.GroupId ??= _kafkaOption.ConsumerGroup;
            //关闭自动提交
            config.EnableAutoCommit ??= false;
            //从最早的开始读取
            config.AutoOffsetReset ??= AutoOffsetReset.Earliest;
            //自动创建主题
            config.AllowAutoCreateTopics ??= true;
            //初始化消费者 定义返回的消息格式为二进制 为了后期可以支持messagepack
            return new ConsumerBuilder<string, byte[]>(config).SetErrorHandler(ErrorHander).Build();
        }


        /// <summary>
        /// 获取消费者
        /// </summary>
        /// <returns></returns>
        protected virtual async Task ConnectionConsumer()
        {
            //验证消费者是否已经初始化
            if (_consumer != null) return;

            //阻止
            await _connectionSemaphoreSlim.WaitAsync();

            try
            {
                //二次验证
                if (_consumer != null) return;
                //初始化配置信息
                var config = new ConsumerConfig(_kafkaOption.Config);
                //服务地址
                config.BootstrapServers ??= _kafkaOption.BrokersServices.ToBrokersService();
                //消费者组
                config.GroupId ??= _kafkaOption.ConsumerGroup;
                //关闭自动提交
                config.EnableAutoCommit ??= false;
                //从最早的开始读取
                config.AutoOffsetReset ??= AutoOffsetReset.Earliest;
                //自动创建主题
                config.AllowAutoCreateTopics ??= true;
                //初始化消费者 定义返回的消息格式为二进制 为了后期可以支持messagepack
                _consumer = new ConsumerBuilder<string, byte[]>(config).SetErrorHandler(ErrorHander).Build();
            }
            catch (Exception ex)
            {
                _logger.LogError("消费者连接异常,error={error}", ex.Message);
                throw;
            }
            finally
            {
                //释放信号
                _connectionSemaphoreSlim.Release();
            }
        }
        /// <summary>
        /// 错误处理
        /// </summary>
        /// <param name="consumer"></param>
        /// <param name="error"></param>
        private void ErrorHander(IConsumer<string, byte[]> consumer, Error error)
        {
            _logger.LogError($"{nameof(ErrorHander)},error={error.Reason}");
        }
    }
}
